---
title: "分支、合并、条件 | 工作流 | Kastrax 文档"
description: "Kastrax 工作流中的控制流允许您管理分支、合并和条件，以构建满足您逻辑需求的工作流。"
---

# 工作流中的控制流：分支、合并和条件 ✅

当您创建多步骤流程时，您可能需要并行运行步骤、按顺序链接它们，或者根据结果遵循不同的路径。本页描述了如何管理分支、合并和条件，以构建满足您逻辑需求的工作流。代码片段展示了构建复杂控制流的关键模式。

## 并行执行 ✅

如果步骤之间没有依赖关系，您可以同时运行多个步骤。当步骤执行独立任务时，这种方法可以加速您的工作流。下面的代码展示了如何并行添加两个步骤：

```typescript
myWorkflow.step(fetchUserData).step(fetchOrderData);
```

有关更多详细信息，请参见[并行步骤](../../examples/workflows/parallel-steps.mdx)示例。

## 顺序执行 ✅

有时您需要按严格顺序运行步骤，以确保一个步骤的输出成为下一个步骤的输入。使用 .then() 链接依赖操作。下面的代码展示了如何按顺序链接步骤：

```typescript
myWorkflow.step(fetchOrderData).then(validateData).then(processOrder);
```

有关更多详细信息，请参见[顺序步骤](../../examples/workflows/sequential-steps.mdx)示例。

## 分支和合并路径 ✅

当不同的结果需要不同的路径时，分支很有用。一旦完成，您也可以稍后合并路径。下面的代码展示了如何在 stepA 之后分支，然后在 stepF 上汇合：

```typescript
myWorkflow
  .step(stepA)
    .then(stepB)
    .then(stepD)
  .after(stepA)
    .step(stepC)
    .then(stepE)
  .after([stepD, stepE])
    .step(stepF);
```

在此示例中：

- stepA 导致 stepB，然后到 stepD。
- 单独地，stepA 也触发 stepC，而 stepC 又导致 stepE。
- 单独地，当 stepD 和 stepE 都完成时，触发 stepF。

有关更多详细信息，请参见[分支路径](../../examples/workflows/branching-paths.mdx)示例。

## 合并多个分支 ✅

有时您需要一个步骤仅在多个其他步骤完成后执行。Kastrax 提供了一个复合 `.after([])` 语法，允许您为一个步骤指定多个依赖项。

```typescript
myWorkflow
  .step(fetchUserData)
  .then(validateUserData)
  .step(fetchProductData)
  .then(validateProductData)
  // 此步骤仅在 validateUserData 和 validateProductData 都完成后运行
  .after([validateUserData, validateProductData])
  .step(processOrder)
```

在此示例中：
- `fetchUserData` 和 `fetchProductData` 在并行分支中运行
- 每个分支都有自己的验证步骤
- `processOrder` 步骤仅在两个验证步骤都成功完成后执行

这种模式特别适用于：
- 连接并行执行路径
- 在工作流中实现同步点
- 确保在继续之前所有必需的数据都可用

您还可以通过组合多个 `.after([])` 调用来创建复杂的依赖模式：

```typescript
myWorkflow
  // 第一个分支
  .step(stepA)
  .then(stepB)
  .then(stepC)

  // 第二个分支
  .step(stepD)
  .then(stepE)

  // 第三个分支
  .step(stepF)
  .then(stepG)

  // 此步骤依赖于多个分支的完成
  .after([stepC, stepE, stepG])
  .step(finalStep)
```

## 循环依赖和循环 ✅

工作流通常需要重复步骤，直到满足特定条件。Kastrax 提供了两种强大的方法来创建循环：`until` 和 `while`。这些方法提供了一种直观的方式来实现重复任务。

### 使用手动循环依赖（传统方法）

在早期版本中，您可以通过手动定义带有条件的循环依赖来创建循环：

```typescript
myWorkflow
  .step(fetchData)
  .then(processData)
  .after(processData)
  .step(finalizeData, {
    when: { "processData.status": "success" },
  })
  .step(fetchData, {
    when: { "processData.status": "retry" },
  });
```

虽然这种方法仍然有效，但较新的 `until` 和 `while` 方法提供了一种更清晰和更易于维护的方式来创建循环。

### 使用 `until` 进行基于条件的循环

`until` 方法重复一个步骤，直到指定的条件变为真。它接受以下参数：
1. 确定何时停止循环的条件
2. 要重复的步骤
3. 可选的变量，传递给重复的步骤

```typescript
// 递增计数器直到达到目标的步骤
const incrementStep = new Step({
  id: 'increment',
  inputSchema: z.object({
    // 当前计数器值
    counter: z.number().optional(),
  }),
  outputSchema: z.object({
    // 更新后的计数器值
    updatedCounter: z.number(),
  }),
  execute: async ({ context }) => {
    const { counter = 0 } = context.inputData;
    return { updatedCounter: counter + 1 };
  },
});

workflow
  .step(incrementStep)
  .until(
    async ({ context }) => {
      // 当计数器达到 10 时停止
      const result = context.getStepResult(incrementStep);
      return (result?.updatedCounter ?? 0) >= 10;
    },
    incrementStep,
    {
      // 将当前计数器传递给下一次迭代
      counter: {
        step: incrementStep,
        path: 'updatedCounter'
      }
    }
  )
  .then(finalStep);
```

您也可以使用基于引用的条件：

```typescript
workflow
  .step(incrementStep)
  .until(
    {
      ref: { step: incrementStep, path: 'updatedCounter' },
      query: { $gte: 10 },
    },
    incrementStep,
    {
      counter: {
        step: incrementStep,
        path: 'updatedCounter'
      }
    }
  )
  .then(finalStep);
```

### 使用 `while` 进行基于条件的循环

`while` 方法重复一个步骤，只要指定的条件保持为真。它接受与 `until` 相同的参数：
1. 确定何时继续循环的条件
2. 要重复的步骤
3. 可选的变量，传递给重复的步骤

```typescript
// 在低于目标时递增计数器的步骤
const incrementStep = new Step({
  id: 'increment',
  inputSchema: z.object({
    // 当前计数器值
    counter: z.number().optional(),
  }),
  outputSchema: z.object({
    // 更新后的计数器值
    updatedCounter: z.number(),
  }),
  execute: async ({ context }) => {
    const { counter = 0 } = context.inputData;
    return { updatedCounter: counter + 1 };
  },
});

workflow
  .step(incrementStep)
  .while(
    async ({ context }) => {
      // 当计数器小于 10 时继续
      const result = context.getStepResult(incrementStep);
      return (result?.updatedCounter ?? 0) < 10;
    },
    incrementStep,
    {
      // 将当前计数器传递给下一次迭代
      counter: {
        step: incrementStep,
        path: 'updatedCounter'
      }
    }
  )
  .then(finalStep);
```

您也可以使用基于引用的条件：

```typescript
workflow
  .step(incrementStep)
  .while(
    {
      ref: { step: incrementStep, path: 'updatedCounter' },
      query: { $lt: 10 },
    },
    incrementStep,
    {
      counter: {
        step: incrementStep,
        path: 'updatedCounter'
      }
    }
  )
  .then(finalStep);
```

### 引用条件的比较运算符

使用基于引用的条件时，您可以使用以下比较运算符：

| 运算符 | 描述 |
|----------|-------------|
| `$eq`    | 等于    |
| `$ne`    | 不等于 |
| `$gt`    | 大于 |
| `$gte`   | 大于或等于 |
| `$lt`    | 小于 |
| `$lte`   | 小于或等于 |

## 条件 ✅

使用 when 属性根据先前步骤的数据控制步骤是否运行。以下是指定条件的三种方式。

### 选项 1：函数

```typescript
myWorkflow.step(
  new Step({
    id: "processData",
    execute: async ({ context }) => {
      // 动作逻辑
    },
  }),
  {
    when: async ({ context }) => {
      const fetchData = context?.getStepResult<{ status: string }>("fetchData");
      return fetchData?.status === "success";
    },
  },
);
```

### 选项 2：查询对象

```typescript
myWorkflow.step(
  new Step({
    id: "processData",
    execute: async ({ context }) => {
      // 动作逻辑
    },
  }),
  {
    when: {
      ref: {
        step: {
          id: "fetchData",
        },
        path: "status",
      },
      query: { $eq: "success" },
    },
  },
);
```

### 选项 3：简单路径比较

```typescript
myWorkflow.step(
  new Step({
    id: "processData",
    execute: async ({ context }) => {
      // 动作逻辑
    },
  }),
  {
    when: {
      "fetchData.status": "success",
    },
  },
);
```

## 数据访问模式 ✅

Kastrax 提供了几种在步骤之间传递数据的方式：

1. **上下文对象** - 通过上下文对象直接访问步骤结果
2. **变量映射** - 显式地将一个步骤的输出映射到另一个步骤的输入
3. **getStepResult 方法** - 类型安全的方法来检索步骤输出

每种方法根据您的用例和类型安全要求都有其优势。

### 使用 getStepResult 方法

`getStepResult` 方法提供了一种类型安全的方式来访问步骤结果。在使用 TypeScript 时，这是推荐的方法，因为它保留了类型信息。

#### 基本用法

为了更好的类型安全，您可以为 `getStepResult` 提供类型参数：

```typescript showLineNumbers filename="src/kastrax/workflows/get-step-result.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

const fetchUserStep = new Step({
  id: 'fetchUser',
  outputSchema: z.object({
    name: z.string(),
    userId: z.string(),
  }),
  execute: async ({ context }) => {
    return { name: 'John Doe', userId: '123' };
  },
});

const analyzeDataStep = new Step({
  id: "analyzeData",
  execute: async ({ context }) => {
    // 类型安全地访问先前步骤结果
    const userData = context.getStepResult<{ name: string, userId: string }>("fetchUser");

    if (!userData) {
      return { status: "error", message: "User data not found" };
    }

    return {
      analysis: `Analyzed data for user ${userData.name}`,
      userId: userData.userId
    };
  },
});
```


#### 使用步骤引用

最类型安全的方法是在 `getStepResult` 调用中直接引用步骤：

```typescript showLineNumbers filename="src/kastrax/workflows/step-reference.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

// 定义带有输出模式的步骤
const fetchUserStep = new Step({
  id: "fetchUser",
  outputSchema: z.object({
    userId: z.string(),
    name: z.string(),
    email: z.string(),
  }),
  execute: async () => {
    return {
      userId: "user123",
      name: "John Doe",
      email: "john@example.com"
    };
  },
});

const processUserStep = new Step({
  id: "processUser",
  execute: async ({ context }) => {
    // TypeScript 将从 fetchUserStep 的 outputSchema 推断正确的类型
    const userData = context.getStepResult(fetchUserStep);

    return {
      processed: true,
      userName: userData?.name
    };
  },
});

const workflow = new Workflow({
  name: "user-workflow",
});

workflow
  .step(fetchUserStep)
  .then(processUserStep)
  .commit();
```




### 使用变量映射

变量映射是定义步骤之间数据流的显式方式。
这种方法使依赖关系清晰，并提供良好的类型安全。
注入到步骤中的数据在 `context.inputData` 对象中可用，并基于步骤的 `inputSchema` 进行类型化。

```typescript showLineNumbers filename="src/kastrax/workflows/variable-mapping.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

const fetchUserStep = new Step({
  id: "fetchUser",
  outputSchema: z.object({
    userId: z.string(),
    name: z.string(),
    email: z.string(),
  }),
  execute: async () => {
    return {
      userId: "user123",
      name: "John Doe",
      email: "john@example.com"
    };
  },
});

const sendEmailStep = new Step({
  id: "sendEmail",
  inputSchema: z.object({
    recipientEmail: z.string(),
    recipientName: z.string(),
  }),
  execute: async ({ context }) => {
    const { recipientEmail, recipientName } = context.inputData;

    // 发送电子邮件逻辑在这里
    return {
      status: "sent",
      to: recipientEmail
    };
  },
});

const workflow = new Workflow({
  name: "email-workflow",
});

workflow
  .step(fetchUserStep)
  .then(sendEmailStep, {
    variables: {
      // 将 fetchUser 的特定字段映射到 sendEmail 输入
      recipientEmail: { step: fetchUserStep, path: 'email' },
      recipientName: { step: fetchUserStep, path: 'name' }
    }
  })
  .commit();
```

有关变量映射的更多详细信息，请参见[使用工作流变量进行数据映射](./variables.mdx)文档。

### 使用上下文对象

上下文对象提供了对所有步骤结果及其输出的直接访问。这种方法更灵活，但需要谨慎处理以维持类型安全。
您可以通过 `context.steps` 对象直接访问步骤结果：

```typescript showLineNumbers filename="src/kastrax/workflows/context-access.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

const processOrderStep = new Step({
  id: 'processOrder',
  execute: async ({ context }) => {
    // 访问先前步骤的数据
    let userData: { name: string, userId: string };
    if (context.steps['fetchUser']?.status === 'success') {
      userData = context.steps.fetchUser.output;
    } else {
      throw new Error('User data not found');
    }

    return {
      orderId: 'order123',
      userId: userData.userId,
      status: 'processing',
    };
  },
});

const workflow = new Workflow({
  name: "order-workflow",
});

workflow
  .step(fetchUserStep)
  .then(processOrderStep)
  .commit();
```

### 工作流级别的类型安全

为了在整个工作流中实现全面的类型安全，您可以为所有步骤定义类型并将它们传递给工作流
这允许您在条件上获得上下文对象的类型安全，以及在最终工作流输出中获得步骤结果的类型安全。

```typescript showLineNumbers filename="src/kastrax/workflows/workflow-typing.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";


// 创建带有类型化输出的步骤
const fetchUserStep = new Step({
  id: "fetchUser",
  outputSchema: z.object({
    userId: z.string(),
    name: z.string(),
    email: z.string(),
  }),
  execute: async () => {
    return {
      userId: "user123",
      name: "John Doe",
      email: "john@example.com"
    };
  },
});

const processOrderStep = new Step({
  id: "processOrder",
  execute: async ({ context }) => {
    // TypeScript 知道 userData 的形状
    const userData = context.getStepResult(fetchUserStep);

    return {
      orderId: "order123",
      status: "processing"
    };
  },
});

const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
  name: "typed-workflow",
});

workflow
  .step(fetchUserStep)
  .then(processOrderStep)
  .until(async ({ context }) => {
    // TypeScript 在这里知道 userData 的形状
    const res = context.getStepResult('fetchUser');
    return res?.userId === '123';
  }, processOrderStep)
  .commit();
```

### 访问触发器数据

除了步骤结果外，您还可以访问启动工作流的原始触发器数据：

```typescript showLineNumbers filename="src/kastrax/workflows/trigger-data.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

// 定义触发器模式
const triggerSchema = z.object({
  customerId: z.string(),
  orderItems: z.array(z.string()),
});

type TriggerType = z.infer<typeof triggerSchema>;

const processOrderStep = new Step({
  id: "processOrder",
  execute: async ({ context }) => {
    // 类型安全地访问触发器数据
    const triggerData = context.getStepResult<TriggerType>('trigger');

    return {
      customerId: triggerData?.customerId,
      itemCount: triggerData?.orderItems.length || 0,
      status: "processing"
    };
  },
});

const workflow = new Workflow({
  name: "order-workflow",
  triggerSchema,
});

workflow
  .step(processOrderStep)
  .commit();
```

### 访问恢复数据

注入到步骤中的数据在 `context.inputData` 对象中可用，并基于步骤的 `inputSchema` 进行类型化。

```typescript showLineNumbers filename="src/kastrax/workflows/resume-data.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

const processOrderStep = new Step({
  id: "processOrder",
  inputSchema: z.object({
    orderId: z.string(),
  }),
  execute: async ({ context, suspend }) => {
    const { orderId } = context.inputData;

    if (!orderId) {
      await suspend();
      return;
    }

    return {
      orderId,
      status: "processed"
    };
  },
});

const workflow = new Workflow({
  name: "order-workflow",
});

workflow
  .step(processOrderStep)
  .commit();

const run = workflow.createRun();
const result = await run.start();

const resumedResult = await workflow.resume({
  runId: result.runId,
  stepId: 'processOrder',
  inputData: {
    orderId: '123',
  },
});

console.log({resumedResult});
```

### 访问工作流结果

您可以通过将步骤类型注入到 `Workflow` 类型参数中，获得对工作流结果的类型化访问：

```typescript showLineNumbers filename="src/kastrax/workflows/get-results.ts" copy
import { Workflow } from "@kastrax/core/workflows";

const fetchUserStep = new Step({
  id: "fetchUser",
  outputSchema: z.object({
    userId: z.string(),
    name: z.string(),
    email: z.string(),
  }),
  execute: async () => {
    return {
      userId: "user123",
      name: "John Doe",
      email: "john@example.com"
    };
  },
});

const processOrderStep = new Step({
  id: "processOrder",
  outputSchema: z.object({
    orderId: z.string(),
    status: z.string(),
  }),
  execute: async ({ context }) => {
    const userData = context.getStepResult(fetchUserStep);
    return {
      orderId: "order123",
      status: "processing"
    };
  },
});

const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
  name: "typed-workflow",
});

workflow
  .step(fetchUserStep)
  .then(processOrderStep)
  .commit();

const run = workflow.createRun();
const result = await run.start();

// 结果是步骤结果的判别联合
// 所以需要通过状态检查来缩小范围
if (result.results.processOrder.status === 'success') {
  // TypeScript 将知道结果的形状
  const orderId = result.results.processOrder.output.orderId;
  console.log({orderId});
}

if (result.results.fetchUser.status === 'success') {
  const userId = result.results.fetchUser.output.userId;
  console.log({userId});
}
```

### 数据流的最佳实践

1. **使用带有步骤引用的 getStepResult 以获得类型安全**
   - 确保 TypeScript 可以推断正确的类型
   - 在编译时捕获类型错误

2. **使用变量映射以获得显式依赖**
   - 使数据流清晰和可维护
   - 提供步骤依赖的良好文档

3. **为步骤定义输出模式**
   - 在运行时验证数据
   - 验证 `execute` 函数的返回类型
   - 改进 TypeScript 中的类型推断

4. **优雅地处理缺失数据**
   - 在访问属性之前始终检查步骤结果是否存在
   - 为可选数据提供回退值

5. **保持数据转换简单**
   - 在专用步骤中转换数据，而不是在变量映射中
   - 使工作流更容易测试和调试

### 数据流方法的比较

| 方法 | 类型安全 | 显式性 | 用例 |
|--------|------------|--------------|----------|
| getStepResult | 最高 | 高 | 具有严格类型要求的复杂工作流 |
| 变量映射 | 高 | 高 | 当依赖需要清晰和显式时 |
| context.steps | 中等 | 低 | 在简单工作流中快速访问步骤数据 |

通过为您的用例选择正确的数据流方法，您可以创建既类型安全又可维护的工作流。
